-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restore thread starvation tests #12395
base: jetty-12.0.x
Are you sure you want to change the base?
Restore thread starvation tests #12395
Conversation
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
@lorban I've pushed a fixed for the read side of servlets in ee10. Thoughts? |
@lorban @sbordet we could extend CompletableFuture to an InvocableCompletableFuture that either:
Note the returned completablefuture might also need to be wrapped (maybe not for the andThenAsync redirection). |
…OCKING Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
@lorban @sbordet would this help: class NonBlockingCompletableFuture<V> extends CompletableFuture<V> implements Invocable
{
private final Executor _executor;
public NonBlockingCompletableFuture(Executor executor)
{
_executor = executor;
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
public CompletableFuture<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action)
{
return super.acceptEitherAsync(other, action, _executor);
}
@Override
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn)
{
return super.applyToEitherAsync(other, fn, _executor);
}
@Override
public CompletableFuture<V> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<V>> fn)
{
return super.exceptionallyComposeAsync(fn, _executor);
}
@Override
public <U> CompletableFuture<U> handle(BiFunction<? super V, Throwable, ? extends U> fn)
{
return super.handleAsync(fn, _executor);
}
@Override
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
{
return super.runAfterBothAsync(other, action, _executor);
}
@Override
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
{
return super.runAfterEitherAsync(other, action, _executor);
}
@Override
public CompletableFuture<Void> thenAccept(Consumer<? super V> action)
{
return super.thenAcceptAsync(action, _executor);
}
@Override
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action)
{
return super.thenAcceptBothAsync(other, action, _executor);
}
@Override
public <U> CompletableFuture<U> thenApply(Function<? super V, ? extends U> fn)
{
return super.thenApplyAsync(fn, _executor);
}
@Override
public <U, V1> CompletableFuture<V1> thenCombine(CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn)
{
return super.thenCombineAsync(other, fn, _executor);
}
@Override
public <U> CompletableFuture<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn)
{
return super.thenComposeAsync(fn, _executor);
}
@Override
public CompletableFuture<Void> thenRun(Runnable action)
{
return super.thenRunAsync(action, _executor);
}
@Override
public CompletableFuture<V> whenComplete(BiConsumer<? super V, ? super Throwable> action)
{
return super.whenCompleteAsync(action, _executor);
}
} |
I honestly would not go there. I think we can just say in the javadocs to never block on the CF returned, or deprecate the API. |
We need more than javadocs, as we need the Runnable or Callback created from the CF to be Invocable. So how about: /**
* An extension of {@link java.util.concurrent.CompletableFuture} that is an {@link Invocable}.
* The {@link InvocationType} is initially the type used in construction (default NON_BLOCKING).
* If a non async method is called, then the invocation type of any passed function is used.
* @param <V>
*/
class InvocableCompletableFuture<V> extends java.util.concurrent.CompletableFuture<V> implements Invocable
{
private final AtomicReference<InvocationType> _invocationType = new AtomicReference<>();
public InvocableCompletableFuture()
{
this(null);
}
public InvocableCompletableFuture(InvocationType invocationType)
{
_invocationType.set(Objects.requireNonNullElse(invocationType, InvocationType.NON_BLOCKING));
}
@Override
public InvocationType getInvocationType()
{
return _invocationType.get();
}
@Override
public java.util.concurrent.CompletableFuture<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action)
{
_invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
return super.acceptEither(other, action);
}
@Override
public <U> java.util.concurrent.CompletableFuture<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn)
{
_invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(fn)));
return super.applyToEither(other, fn);
}
@Override
public <U> java.util.concurrent.CompletableFuture<U> handle(BiFunction<? super V, Throwable, ? extends U> fn)
{
_invocationType.set(Invocable.getInvocationType(fn));
return super.handle(fn);
}
@Override
public java.util.concurrent.CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
{
_invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
return super.runAfterBoth(other, action);
}
@Override
public java.util.concurrent.CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
{
_invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
return super.runAfterEither(other, action);
}
@Override
public java.util.concurrent.CompletableFuture<Void> thenAccept(Consumer<? super V> action)
{
_invocationType.set(Invocable.getInvocationType(action));
return super.thenAccept(action);
}
@Override
public <U> java.util.concurrent.CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action)
{
_invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
return super.thenAcceptBoth(other, action);
}
@Override
public <U> java.util.concurrent.CompletableFuture<U> thenApply(Function<? super V, ? extends U> fn)
{
_invocationType.set(Invocable.getInvocationType(fn));
return super.thenApply(fn);
}
@Override
public <U, V1> java.util.concurrent.CompletableFuture<V1> thenCombine(CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn)
{
_invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(fn)));
return super.thenCombine(other, fn);
}
@Override
public <U> java.util.concurrent.CompletableFuture<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn)
{
_invocationType.set(Invocable.getInvocationType(fn));
return super.thenCompose(fn);
}
@Override
public java.util.concurrent.CompletableFuture<Void> thenRun(Runnable action)
{
_invocationType.set(Invocable.getInvocationType(action));
return super.thenRun(action);
}
@Override
public java.util.concurrent.CompletableFuture<V> whenComplete(BiConsumer<? super V, ? super Throwable> action)
{
_invocationType.set(Invocable.getInvocationType(action));
return super.whenComplete(action);
}
} |
That would then be used with: public abstract class ContentSourceCompletableFuture<X> extends Invocable.InvocableCompletableFuture<X> implements Runnable
{
private final Content.Source _content;
public ContentSourceCompletableFuture(Content.Source content)
{
_content = content;
}
/**
* <p>Initiates the parsing of the {@link Content.Source}.</p>
* <p>For every valid chunk that is read, {@link #parse(Content.Chunk)}
* is called, until a result is produced that is used to
* complete this {@link CompletableFuture}.</p>
* <p>Internally, this method is called multiple times to progress
* the parsing in response to {@link Content.Source#demand(Runnable)}
* calls.</p>
* <p>Exceptions thrown during parsing result in this
* {@link CompletableFuture} to be completed exceptionally.</p>
*/
public void parse()
{
while (true)
{
Content.Chunk chunk = _content.read();
if (chunk == null)
{
_content.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
if (chunk.isLast())
{
completeExceptionally(chunk.getFailure());
}
else
{
if (onTransientFailure(chunk.getFailure()))
continue;
_content.fail(chunk.getFailure());
completeExceptionally(chunk.getFailure());
}
return;
}
try
{
X x = parse(chunk);
if (x != null)
{
complete(x);
return;
}
}
catch (Throwable failure)
{
completeExceptionally(failure);
return;
}
finally
{
chunk.release();
}
if (chunk.isLast())
{
completeExceptionally(new EOFException());
return;
}
}
}
@Override
public void run()
{
parse();
}
/**
* <p>Called by {@link #parse()} to parse a {@link org.eclipse.jetty.io.Content.Chunk}.</p>
*
* @param chunk The chunk containing content to parse. The chunk will never be {@code null} nor a
* {@link org.eclipse.jetty.io.Content.Chunk#isFailure(Content.Chunk) failure chunk}.
* If the chunk is stored away to be used later beyond the scope of this call,
* then implementations must call {@link Content.Chunk#retain()} and
* {@link Content.Chunk#release()} as appropriate.
* @return The parsed {@code X} result instance or {@code null} if parsing is not yet complete
* @throws Throwable If there is an error parsing
*/
protected abstract X parse(Content.Chunk chunk) throws Throwable;
/**
* <p>Callback method that informs the parsing about how to handle transient failures.</p>
*
* @param cause A transient failure obtained by reading a {@link Content.Chunk#isLast() non-last}
* {@link org.eclipse.jetty.io.Content.Chunk#isFailure(Content.Chunk) failure chunk}
* @return {@code true} if the transient failure can be ignored, {@code false} otherwise
*/
protected boolean onTransientFailure(Throwable cause)
{
return false;
}
} |
I think there is no problem changing But, the current approach doesn't need this. |
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
IMHO the That being said, I think the main problem I have with that API is that it makes it so easy to write broken code that I'm tempted to say it encourages. This simple example brings back the original problem in a non-obvious way: String s = Content.Source.asStringAsync(source, charset).thenRun(() -> { /* whatever */ }).get(); The above is my main motivation to say we should deprecate all our Back to the main problem, I remember a few months ago I said that we made a mistake when we designed the |
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
@lorban unfortunately it's not a compatible change: we have several occurrences of allocating a |
They will be, as Furthermore, it would not solve the problem of a double composition, e.g. I really would not go into trying to put another smartness to fix this, but rather just document clearly the API or deprecate it. |
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
I've just noticed some other complicating factor we must be careful about: a lot of places where an This means if we ever decide to manipulate the |
@gregw, also, |
The main difference is assuming that the `Runnable` passed to `Content.Source.demand(Runnable)` is non-blocking unless explicitly declared so. This leaves most of the application code using lambdas and method references unchanged. Deprecated usages of `CompletableFuture` APIs in `Content.Source`. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
See alternative at #12406. |
This is somewhat of an issue, but it is something we have elsewhere in the code (see dyanamic handler invocation type). It is something worth javadocing, but I do not think it is fixable. |
"This fix" is not attempting this smartness we are discussing here.... but I think it is worthwhile discussing it so that we fully understand the problem and possible ways out.
Firstly, this uses a passed executor, so there is no common pool thread involved, only our executor. But I do not think this changes behaviour. If our thread pool is exhausted, currently our CF is seen as blocking and just never called, so deadlock happens always. If we did a change based on converting non-async methods to async, then in the blocking case, we still hit the exhausted thread pool and are deadlocked.
Yeah, that would require further wrapping... hence I don't like this approach. So I'm not proposing we do this, and this PR is not doing it. But I do believe that if we want to control the thread that does the callback, it is within our authority both as the caller and as the implementor of the CF, so there would be no surprise. |
I think we should fix all our CF usages (it is not proving to be too difficult) and then separately consider if it is indeed the best API to use for many of our utility classes. As you say, it is orthogonal. There are several ways we can "fix" our existing CF usage:
My contributions to this PR implement 1). They are not too complex, appear to be working, but are vulnerable to dynamic changes of invocation type. I don't mind 3) or 4) as they solve the dynamic type issue. 3) is a big neutering of the CF API and really shouts that we should not use it and do something else. 4) is less so, as it gives a way all the API can be used. Ultimately, we have always known that the invocation type of our callbacks is important and previously we have always dealt with it. For some reason, we forgot that for a little while and introduced a few utility classes that didn't do the work required to ensure a good invocation type. I think least surprise is to not radically change our API/contract and to just go back to caring about invocation type in all our utility classes and better javadoc of the necessity to do so for others that implement the API directly (we could even give warning for non invocable demand callbacks?). There is still a role for Blocking callbacks to demand. It is just that they cannot be used if their only job is to wakeup an otherwise blocked thread, as that is deadlock. If the demand callback is genuinely blocking (eg on a database), then it will just have to wait until a thread is available before it is called and then it can block on the database. It just cannot wait if its job is to free up a thread that would go back to the pool to call itself in the first place. |
For our CF based utility classes, we have two distinct styles of usage:
We typically do not mix these styles, other than the However, I don't think my fix here is good for the 2) usage. The parsing, and hence demanding, is commenced within the CompletableFuture<Fields> futureFormFields = FormFields.from(getRequest(), _charset, InvocationType.BLOCKING);
// if we are done already, then we are still in the scope of the original process call and can
// process directly, otherwise we must execute a call to process as we are within a serialized
// demand callback.
boolean done = futureFormFields.isDone();
futureFormFields.whenComplete(done ? this::process : this::executeProcess); In this case the CF returned should have the public T get()
{
if (getInvocationType() != InvocationType.NON_BLOCKING && !isDone())
throw new IllegalStateException("Cannot call get unless NON_BLOCKING or complete");
return super.get();
} But it would be a bit strange as we'd then need to do in Fields formFields = FormFields.from(request, InvocationType.NON_BLOCKING).get(); which I think is correct, but reads strangely. It is also strange that the passed invocation type is only used if this is the first call to As I think the current "fix" is wrong. I'll change it today to do this. |
…214-restore-thread-starvation-tests
The CF fix works but is counter intuitive, fragile and mind blowing when combining CFs as we do going from core Parts to servlet Parts. So I've pushed some changes that deprecate all the CF APIs in Fields and MultiPart and replace with explicit |
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
@gregw I have not looked yet, but regarding this comment:
That is true, but what I would like to see is actually the following:
In this way, the semantic of Then, if the application blocks on the I'll review as soon as I am back. |
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
@sbordet I think we are agreed that ultimately we want to remove CF for our implementation and API. However, currently it is in our API (although deprecated by this PR), so we do need to fix it. Hence I think we should have the extended CF (also deprecated) as an interim fix until we can remove the CFs entirely. So the key things we need to do in this PR is:
As we are close to release time, can you focus on reviewing 2), as new API is forever. I like the general style of the new API in this PR: void onParts(Request request, BiConsumer<Parts, Throwable> immediate, InvocableBiConsumer<Parts, Throwable> future);
default void onParts(Request request, InvocableBiConsumer<Parts, Throwable> future)
{ onParts(request, future, future); } Thoughts about this are:
An alternative would be: void onParts(Request request, Promise<Parts> immediate, Promise<Parts> future);
default void onParts(Request request, Promise<Parts> future)
{ onParts(request, future, future); } Thoughts:
|
@lorban thoughts on the API comment above? |
I like
but I do not like that it is not I don't mind that this API is not as fluent as I don't like Oh, and there are other places where we expose |
WIP
testReadStarvation
still is broken in ee9, ee10 and core.